AWS Step FunctionsからAmazon Kinesis Data Firehoseに複数のJSONデータをPutRecordBatch APIでPutする(AWS CDK)
こんにちは、CX事業本部 IoT事業部の若槻です。
前回および前々回のエントリで、Step FunctionsからKinesis Data FirehoseへのJSONデータのPutをしてみました。
- AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータをPutする(AWS CDK) | DevelopersIO
- AWS Step FunctionsからAmazon Kinesis Data FirehoseにJSONデータを”改行コード付きで”Putする(AWS CDK) | DevelopersIO
その際に使用したAPIはPutRecordでした。PutRecordはリクエスト毎に1レコードしかPutできないため、複数レコードをPutしたい場合はPut毎にMap Stateを回すことになりますが、レコード数が多いと非効率です。
そこで今回は、AWS Step FunctionsからAmazon Kinesis Data Firehoseに、複数のJSONデータをPutRecordBatch APIで一括でPutする構成をAWS CDKで作ってみました。
やってみた
実装
AWS CDK v2(TypeScript)で次のようなCDKスタックを作成します。
import { Construct } from 'constructs'; import { aws_s3, aws_stepfunctions_tasks, aws_stepfunctions, Stack, StackProps, RemovalPolicy, Duration, Size, } from 'aws-cdk-lib'; import * as firehose_alpha from '@aws-cdk/aws-kinesisfirehose-alpha'; import * as firehose_destinations_alpha from '@aws-cdk/aws-kinesisfirehose-destinations-alpha'; export class ProcessStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); // データ格納バケット const dataBucket = new aws_s3.Bucket(this, 'dataBucket', { bucketName: `data-${this.account}-${this.region}`, removalPolicy: RemovalPolicy.DESTROY, }); // Kinesis Firehose Delivery Stream const deliveryStream = new firehose_alpha.DeliveryStream( this, 'deliveryStream', { deliveryStreamName: 'deliveryStream', destinations: [ new firehose_destinations_alpha.S3Bucket(dataBucket, { dataOutputPrefix: 'data/', errorOutputPrefix: 'error/!{firehose:error-output-type}/', bufferingInterval: Duration.seconds(60), bufferingSize: Size.mebibytes(64), }), ], } ); // 改行コード文字列の取得 const getNewLineCodeTask = new aws_stepfunctions.Pass( this, 'getNewLineCodeTask', { parameters: { value: '\n', }, resultPath: '$.getNewLineCodeTaskOutPut', } ); // レコードデータの配列を取得 const getRecordDataMap = new aws_stepfunctions.Map( this, 'getRecordDataMapTask', { itemsPath: aws_stepfunctions.JsonPath.stringAt('$.input.items'), parameters: { item: aws_stepfunctions.JsonPath.stringAt('$$.Map.Item.Value'), newLineCode: aws_stepfunctions.JsonPath.stringAt( '$.getNewLineCodeTaskOutPut.value' ), }, resultPath: '$.getRecordDataMapOutPut', } ).iterator( new aws_stepfunctions.Pass(this, 'getRecordDataPass', { parameters: { Data: aws_stepfunctions.JsonPath.format( '{}{}', aws_stepfunctions.JsonPath.jsonToString( aws_stepfunctions.JsonPath.stringAt('$.item') ), aws_stepfunctions.JsonPath.stringAt('$.newLineCode') ), }, resultPath: '$.getRecordDataPassOutPut', outputPath: aws_stepfunctions.JsonPath.stringAt( '$.getRecordDataPassOutPut' ), }) ); // Delivery streamへレコードをPutするタスク const putRecordBatchTask = new aws_stepfunctions_tasks.CallAwsService( this, 'putRecordBatchTask', { service: 'firehose', action: 'putRecordBatch', parameters: { DeliveryStreamName: deliveryStream.deliveryStreamName, Records: aws_stepfunctions.JsonPath.stringAt( '$.getRecordDataMapOutPut[*]' ), }, iamResources: [deliveryStream.deliveryStreamArn], iamAction: 'firehose:putRecordBatch', resultPath: aws_stepfunctions.JsonPath.DISCARD, } ); // State Machine new aws_stepfunctions.StateMachine(this, 'stateMachine', { stateMachineName: 'stateMachine', definition: getNewLineCodeTask .next(getRecordDataMap) .next(putRecordBatchTask), }); } }
- Map State(
getRecordDataMap
)を使用してJSONデータの配列を、JSON文字列 + 改行コード
という配列の形式に変換しているところが肝となります。
上記をCDK Deployしてスタックをデプロイすると、次の定義のState Machineが作成されます。
{ "StartAt": "getNewLineCodeTask", "States": { "getNewLineCodeTask": { "Type": "Pass", "ResultPath": "$.getNewLineCodeTaskOutPut", "Parameters": { "value": "\n" }, "Next": "getRecordDataMapTask" }, "getRecordDataMapTask": { "Type": "Map", "ResultPath": "$.getRecordDataMapOutPut", "Next": "putRecordBatchTask", "Parameters": { "item.$": "$$.Map.Item.Value", "newLineCode.$": "$.getNewLineCodeTaskOutPut.value" }, "Iterator": { "StartAt": "getRecordDataPass", "States": { "getRecordDataPass": { "Type": "Pass", "ResultPath": "$.getRecordDataPassOutPut", "Parameters": { "Data.$": "States.Format('{}{}', States.JsonToString($.item), $.newLineCode)" }, "OutputPath": "$.getRecordDataPassOutPut", "End": true } } }, "ItemsPath": "$.input.items" }, "putRecordBatchTask": { "End": true, "Type": "Task", "ResultPath": null, "Resource": "arn:aws:states:::aws-sdk:firehose:putRecordBatch", "Parameters": { "DeliveryStreamName": "deliveryStream", "Records.$": "$.getRecordDataMapOutPut[*]" } } } }
動作確認
次のInputを指定してState Machineを実行します。
{ "input": { "items": [ { "id": "u001", "count": 1, "area": "Akihabara" }, { "id": "u002", "count": 3, "area": "Ikebukuro" }, { "id": "u003", "count": 2, "area": "Shinjuku" } ] } }
State Machine実行が成功しました。
次のParameterを使用してPutRecordBatch APIが叩かれました。
{ "DeliveryStreamName": "deliveryStream", "Records": [ { "Data": "{\"id\":\"u001\",\"count\":1,\"area\":\"Akihabara\"}\n" }, { "Data": "{\"id\":\"u002\",\"count\":3,\"area\":\"Ikebukuro\"}\n" }, { "Data": "{\"id\":\"u003\",\"count\":2,\"area\":\"Shinjuku\"}\n" } ] }
Buffer期間が経過後に出力先Bucketの内容を確認すると、オブジェクトが1つ出力されています。
aws s3 ls s3://${BUCKET_NAME} --recursive 2022-08-10 22:56:34 128 data/2022/08/10/13/deliveryStream-1-2022-08-10-13-55-33-04431990-8c8d-465f-b41b-fcda45713eac
オブジェクトの内容を見ると、Putしたレコードがすべて記述されていますね!
{"id":"u001","count":1,"area":"Akihabara"} {"id":"u002","count":3,"area":"Ikebukuro"} {"id":"u003","count":2,"area":"Shinjuku"}
参考
- AWS Step Functionsでは組み込み関数だけで配列のフィルターやスライスができる(AWS CDK v2) | DevelopersIO
- Step Functionsの入出力処理の制御パラメータ(InputPath、 Parameters、ResultPathおよびOutputPath)を理解するために参照したドキュメント | DevelopersIO
以上